{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Verifying the MLOps environment on GCP with Cloud AI Platfrom training custom container\n",
    "\n",
    "This notebook verifies the MLOps environment provisioned on GCP\n",
    "1. Create trainer module and submit a Cloud AI Platfrom training job using custom container\n",
    "2. Test using the training result log entries in the Cloud SQL\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 1. Create and submit Cloud AI Platfrom training job\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "import re\n",
    "from IPython.core.display import display, HTML\n",
    "from datetime import datetime\n",
    "import mlflow\n",
    "import pymysql"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Jupyter magic template to create Python file with variable substitution \n",
    "from IPython.core.magic import register_line_cell_magic\n",
    "@register_line_cell_magic\n",
    "def writetemplate(line, cell):\n",
    "    with open(line, 'w') as f:\n",
    "        f.write(cell.format(**globals()))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "experiment_name = \"caipt-test\"\n",
    "mlflow.set_experiment(experiment_name)\n",
    "\n",
    "mlflow_tracking_uri = mlflow.get_tracking_uri()\n",
    "MLFLOW_EXPERIMENTS_URI = os.environ['MLFLOW_EXPERIMENTS_URI']\n",
    "training_artifacts_uri = MLFLOW_EXPERIMENTS_URI+\"/caip-training\"\n",
    "REGION=os.environ['MLOPS_REGION']\n",
    "ML_IMAGE_URI=os.environ['ML_IMAGE_URI']\n",
    "\n",
    "print(f\"MLflow tracking server URI: {mlflow_tracking_uri}\")\n",
    "print(f\"MLflow artifacts store root: {MLFLOW_EXPERIMENTS_URI}\")\n",
    "print(f\"MLflow SQL connction name: {os.environ['MLFLOW_SQL_CONNECTION_NAME']}\")\n",
    "print(f\"MLflow SQL connction string: {os.environ['MLFLOW_SQL_CONNECTION_STR']}\")\n",
    "\n",
    "display(HTML('<hr>You can check results of this test in MLflow and GCS folder:'))\n",
    "display(HTML('<h4><a href=\"{}\" rel=\"noopener noreferrer\" target=\"_blank\">Click to open MLflow UI</a></h4>'.format(os.environ['MLFLOW_TRACKING_EXTERNAL_URI'])))\n",
    "display(HTML('<h4><a href=\"https://console.cloud.google.com/storage/browser/{}\" rel=\"noopener noreferrer\" target=\"_blank\">Click to open GCS folder</a></h4>'.format(MLFLOW_EXPERIMENTS_URI.replace('gs://',''))))\n",
    "\n",
    "!mkdir -p ./package/training"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 1.1. Create model trainer file\n",
    "The following cells will write out python module files that will be sent as a training module to Cloud AI Platform Training.\n",
    "At first, we implement a simple Scikit-learn model training routine."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%writefile ./package/training/task.py\n",
    "\n",
    "import mlflow\n",
    "import mlflow.sklearn\n",
    "import numpy as np\n",
    "from sklearn.linear_model import LogisticRegression\n",
    "import sys\n",
    "import argparse\n",
    "import os\n",
    "\n",
    "def train_model(args):\n",
    "    print(\"Regularized logistic regression model train step started...\")\n",
    "    with mlflow.start_run(nested=True):\n",
    "        X = np.array([-2, -1, 0, 1, 2, 1]).reshape(-1, 1)\n",
    "        y = np.array([0, 0, 1, 1, 1, 0])\n",
    "        # args.epochs is a training job parameter\n",
    "        lr = LogisticRegression(max_iter=args.epochs)\n",
    "        lr.fit(X, y)\n",
    "        score = lr.score(X, y)\n",
    "        mlflow.log_metric(\"score\", score)\n",
    "        mlflow.sklearn.log_model(lr, \"model\")\n",
    "    print(\"LogisticRegression training finished.\")\n",
    "\n",
    "def training_data(local_data):\n",
    "    dircontent = os.listdir(local_data)\n",
    "    print(f\"Check local data @: {local_data} :\\n{dircontent}\")\n",
    "    \n",
    "def upload_data(local, job_dir):\n",
    "    print(f\"Upload local data {local} to GCS: {job_dir}\")\n",
    "\n",
    "def main():\n",
    "    print(f'Training arguments: {\" \".join(sys.argv[1:])}'.format())\n",
    "    parser = argparse.ArgumentParser()\n",
    "    parser.add_argument('--epochs', type=int)\n",
    "    parser.add_argument('--job-dir', type=str)\n",
    "    parser.add_argument('--local_data', type=str)\n",
    "    args, unknown_args = parser.parse_known_args()\n",
    "\n",
    "    # CLOUD_ML_JOB conatains other CAIP Training runtime parameters in JSON object\n",
    "    # job = os.environ['CLOUD_ML_JOB']\n",
    "    \n",
    "    # MLflow locally available\n",
    "    mlflow.set_tracking_uri('http://127.0.0.1:80')\n",
    "    mlflow.set_experiment(\"caipt-test\")\n",
    "\n",
    "    # Data already downloaded from GCS to 'local_data' folder if --data_source argument provided \n",
    "    # in 'ai-platform jobs submit training' command\n",
    "    if args.local_data:\n",
    "        training_data(args.local_data)\n",
    "\n",
    "    print('Training main started')\n",
    "    train_model(args)\n",
    "\n",
    "    # if --job-dir provided in 'ai-platform jobs submit' command you can upload any training result to that\n",
    "    # if args.job_dir:\n",
    "    # upload_data(args.local_data, args.job_dir):\n",
    "\n",
    "if __name__ == '__main__':\n",
    "    main()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Create an empty __init__ file which is needed for training module."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "%%writefile ./package/training/__init__.py\n",
    "\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "setup.py to ensure MLFlow modules are installed"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%writefile ./package/setup.py\n",
    "from setuptools import find_packages\n",
    "from setuptools import setup\n",
    "\n",
    "REQUIRED_PACKAGES = ['mlflow==1.13.1','PyMySQL==0.9.3']\n",
    "\n",
    "setup(\n",
    "    name='trainer',\n",
    "    version='0.1',\n",
    "    install_requires=REQUIRED_PACKAGES,\n",
    "    packages=find_packages(),\n",
    "    include_package_data=True,\n",
    "    description='Customer training setup.'\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 1.2. Submit training job\n",
    "Note: Every run of this notebook cell creates a new traing job!"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "submit_time = datetime.now().strftime(\"%Y%m%d_%H%M%S\")\n",
    "JOB_NAME=f\"training_job_{submit_time}\"\n",
    "JOB_DIR=f\"{training_artifacts_uri}/training_{submit_time}\"\n",
    "print(f\"Training job name: '{JOB_NAME}' will run in {REGION} region using image from:\\n {ML_IMAGE_URI}\\n\")\n",
    "\n",
    "!gcloud ai-platform jobs submit training {JOB_NAME} \\\n",
    "  --region {REGION} \\\n",
    "  --scale-tier BASIC \\\n",
    "  --job-dir {JOB_DIR} \\\n",
    "  --package-path ./package/training/ \\\n",
    "  --module-name training.task \\\n",
    "  --master-image-uri {ML_IMAGE_URI} \\\n",
    "  -- \\\n",
    "  --mlflowuri {MLFLOW_EXPERIMENTS_URI} \\\n",
    "  --epochs 2"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 1.3 Wait for job done\n",
    "After you submit your job, you can monitor the job status"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!gcloud ai-platform jobs describe {JOB_NAME}\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Training logs"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!gcloud ai-platform jobs stream-logs {JOB_NAME}"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 2.0. Cloud AI Platform Training test results\n",
    "Examine the logged entries in Cloud SQL and produced articats in Cloud Storage through MLflow tracking."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "sqlauth=re.search('mysql\\\\+pymysql://(?P<user>.*):(?P<psw>.*)@127.0.0.1:3306/mlflow', os.environ['MLFLOW_SQL_CONNECTION_STR'],re.DOTALL)\n",
    "connection = pymysql.connect(\n",
    "    host='127.0.0.1',\n",
    "    port=3306,\n",
    "    database='mlflow',\n",
    "    user=sqlauth.group('user'),\n",
    "    passwd=sqlauth.group('psw')\n",
    ")\n",
    "cursor = connection.cursor() "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 2.2. Retrieve experiment"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "cursor.execute(f\"SELECT * FROM experiments where name='{experiment_name}' ORDER BY experiment_id desc LIMIT 1\")\n",
    "if cursor.rowcount == 0:\n",
    "    print(\"Experiment not found\")\n",
    "else:\n",
    "    experiment_id = list(cursor)[0][0]\n",
    "    print(f\"'{experiment_name}' experiment ID: {experiment_id}\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 2.3. Query runs"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "cursor.execute(f\"SELECT * FROM runs where experiment_id={experiment_id} ORDER BY start_time desc LIMIT 1\")\n",
    "if cursor.rowcount == 0:\n",
    "    print(\"No runs found\")\n",
    "else:\n",
    "    entity=list(cursor)[0]\n",
    "    run_uuid = entity[0]\n",
    "    print(f\"Last run id of '{experiment_name}' experiment is: {run_uuid}\\n\")\n",
    "    print(entity)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 2.4. Query metrics"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "cursor.execute(f\"SELECT * FROM metrics where run_uuid = '{run_uuid}'\")\n",
    "if cursor.rowcount == 0:\n",
    "    print(\"No metrics found\")\n",
    "else:\n",
    "    for entry in cursor:\n",
    "        print(entry)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 2.5. List the artifacts in Cloud Storage"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!gsutil ls {MLFLOW_EXPERIMENTS_URI}/{experiment_id}/{run_uuid}/artifacts/model"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 3. Submitting a workflow to Composer to run training in Cloud AI Platform training\n",
    "This section will test a training job submitted from Composer workflow by reusing training module\n",
    "created in the 1.1. section earlier. Therefore the training metrics and artifacts will be stored in the \n",
    "same 'caipt-test' MLFlow experiment."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "COMPOSER_NAME=os.environ['MLOPS_COMPOSER_NAME']\n",
    "REGION=os.environ['MLOPS_REGION']\n",
    "\n",
    "submit_time = datetime.now().strftime(\"%Y%m%d_%H%M%S\")\n",
    "JOB_NAME=f\"training_job_{submit_time}\"\n",
    "JOB_DIR=f\"{training_artifacts_uri}/training_{submit_time}\"\n",
    "print(f\"Training job name: '{JOB_NAME}' will run in {REGION} region using image from:\\n {ML_IMAGE_URI}\\n\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 3.1. Importing existing training module\n",
    "\n",
    "Upload local training /package folder to Composer's GCS bucket.\n",
    "See more details about [data import](https://cloud.google.com/sdk/gcloud/reference/composer/environments/storage/data/import) and [Composer's folder structure](https://cloud.google.com/composer/docs/concepts/cloud-storage)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!gcloud composer environments storage data import \\\n",
    "    --environment {COMPOSER_NAME} \\\n",
    "    --location {REGION} \\\n",
    "    --source ./package \\\n",
    "    --destination test-sklearn-mlflow-caipt"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 3.2. Uploading the Airflow workflow"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%writetemplate test-sklearn-mlflow-caipt.py\n",
    "\n",
    "from datetime import timedelta\n",
    "import airflow\n",
    "from airflow.operators.bash_operator import BashOperator\n",
    "from airflow.operators.dummy_operator import DummyOperator\n",
    "\n",
    "default_args = dict(retries=1,start_date=airflow.utils.dates.days_ago(0))\n",
    "\n",
    "command=\"\"\"gcloud ai-platform jobs submit training {JOB_NAME} \\\n",
    "--region {REGION} \\\n",
    "--scale-tier BASIC \\\n",
    "--job-dir {JOB_DIR} \\\n",
    "--package-path /home/airflow/gcs/data/test-sklearn-mlflow-caipt/package/training/ \\\n",
    "--module-name training.task \\\n",
    "--master-image-uri {ML_IMAGE_URI} \\\n",
    "-- \\\n",
    "--mlflowuri {MLFLOW_EXPERIMENTS_URI} \\\n",
    "--epochs 2\"\"\"\n",
    "print (command)\n",
    "\n",
    "with airflow.DAG(\n",
    "    \"test_sklearn_mlflow_caipt\",\n",
    "    default_args=default_args,\n",
    "    schedule_interval=None,\n",
    "    dagrun_timeout=timedelta(minutes=15)) as dag:\n",
    "\n",
    "    dummy_task = DummyOperator(task_id=\"dummy_task\")\n",
    "    \n",
    "    bash_task = BashOperator(\n",
    "    task_id=\"test_sklearn_mlflow_caipt\",\n",
    "    bash_command=command\n",
    "    )\n",
    "    \n",
    "    dummy_task >> bash_task"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!gcloud composer environments storage dags import \\\n",
    "  --environment {COMPOSER_NAME}  \\\n",
    "  --location {REGION} \\\n",
    "  --source test-sklearn-mlflow-caipt.py"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Check imported Dag"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!gcloud composer environments storage dags list \\\n",
    "  --environment {COMPOSER_NAME}  --location {REGION}"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 3.3. Triggering the workflow\n",
    "Please wait for 30-60 seconds before triggering the workflow at the first Airflow Dag import"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!gcloud composer environments run {COMPOSER_NAME} \\\n",
    "    --location {REGION} unpause -- test_sklearn_mlflow_caipt"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!gcloud composer environments run {COMPOSER_NAME} \\\n",
    "    --location {REGION} trigger_dag -- test_sklearn_mlflow_caipt"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 4. Cloud AI Platform Training through Cloud Composer test results"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "cursor = connection.cursor()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 4.1 Retrieve experiment"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "experiment_name = \"caipt-test\"\n",
    "cursor.execute(\"SELECT * FROM experiments where name='{}' ORDER BY experiment_id desc LIMIT 1\".format(experiment_name))\n",
    "if cursor.rowcount == 0:\n",
    "    print(\"Experiment not found\")\n",
    "else:\n",
    "    experiment_id = list(cursor)[0][0]\n",
    "    print(f\"'{experiment_name}' experiment ID: {experiment_id}\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 4.2 Query runs"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "cursor.execute(\"SELECT * FROM runs where experiment_id={} ORDER BY start_time desc LIMIT 1\".format(experiment_id))\n",
    "if cursor.rowcount == 0:\n",
    "    print(\"No runs found\")\n",
    "else:\n",
    "    entity=list(cursor)[0]\n",
    "    run_uuid = entity[0]\n",
    "    print(f\"Last run id of '{experiment_name}' experiment is: {run_uuid}\\n\")\n",
    "    print(entity)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 4.3 Query metrics"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "cursor.execute(\"SELECT * FROM metrics where run_uuid = '{}'\".format(run_uuid))\n",
    "if cursor.rowcount == 0:\n",
    "    print(\"No metrics found\")\n",
    "else:\n",
    "    for entry in cursor:\n",
    "        print(entry)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 4.5. List the artifacts in Cloud Storage"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!gsutil ls {MLFLOW_EXPERIMENTS_URI}/{experiment_id}/{run_uuid}/artifacts/model"
   ]
  }
 ],
 "metadata": {
  "environment": {
   "name": "r-cpu.3-6.m49",
   "type": "gcloud",
   "uri": "gcr.io/deeplearning-platform-release/r-cpu.3-6:m49"
  },
  "kernelspec": {
   "display_name": "Python 3.8.2 64-bit",
   "metadata": {
    "interpreter": {
     "hash": "31f2aee4e71d21fbe5cf8b01ff0e069b9275f58929596ceb00d14d90e3e16cd6"
    }
   },
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.8.2-final"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}